﻿using Models;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Options;
using MQTTnet.Client.Publishing;
using MQTTnet.Client.Subscribing;
using MQTTnet.Client.Unsubscribing;
using MQTTnet.Server;
using System.Text;

namespace Services
{
    public class MQTT_Client
    {
        #region 全局变量与枚举
        private IMqttClient mqttClient;                                     //MQTT客户端连接对象
        public Model_MQTTLoginInfo model_login;
        public List<string> topicsNeedSubscribe;
        public bool isConnected
        {
            get
            {
                if (mqttClient == null)
                {
                    return false;
                }
                else
                {
                    return mqttClient.IsConnected;
                }
            }
        }                                         //调用方获取是否已经连接的属性
        #endregion
        #region 委托与事件
        public delegate void MQTT_MessageReceivedHandler(MQTT_Message msg);//委托
        public event MQTT_MessageReceivedHandler onMQTTMessageReceived;//MQTT接收到消息事件
        public delegate void AfterReconnectedHandler();//断线重连后做的事情的委托
        public event AfterReconnectedHandler onReconnected;//断线重连后做的事情事件
        #endregion
        #region 构造与析构函数
        /// <summary>
        /// 构造函数
        /// </summary>
        /// <param name="modelLogin">MQTT的连接信息</param>
        public MQTT_Client(Model_MQTTLoginInfo modelLogin)
        {
            this.model_login = modelLogin;
        }
        #endregion
        /// <summary>
        /// 连接服务器
        /// </summary>
        /// <returns></returns>
        public FuncResult<bool> ConnectToServer()
        {
            var result = new FuncResult<bool>();
            try
            {
                // 1. 创建 MQTT 客户端
                mqttClient = new MqttFactory().CreateMqttClient();
                // 2 . 设置 MQTT 客户端选项
                MqttClientOptionsBuilder optionsBuilder = new MqttClientOptionsBuilder();
                // 设置服务器端地址
                optionsBuilder.WithTcpServer(this.model_login.strIPAddress, this.model_login.intPort);
                // 设置鉴权参数
                optionsBuilder.WithCredentials(this.model_login.strLoginUserName, this.model_login.strLoginPassword);
                // 设置客户端序列号
                optionsBuilder.WithClientId(this.model_login.strClientID);
                // 创建选项
                IMqttClientOptions options = optionsBuilder.Build();
                // 设置消息接收处理程序
                mqttClient.UseApplicationMessageReceivedHandler(args =>
                {
                    // 收到的消息主题
                    string topic = args.ApplicationMessage.Topic;
                    // 收到的的消息内容
                    string payload = Encoding.UTF8.GetString(args.ApplicationMessage.Payload);
                    // 收到的发送级别(Qos)
                    var qos = args.ApplicationMessage.QualityOfServiceLevel;
                    // 收到的消息保持形式
                    bool retain = args.ApplicationMessage.Retain;
                    //将消息推送至调用方
                    MQTT_Message msg = new MQTT_Message() { text = payload, Topic = topic };
                    if (onMQTTMessageReceived != null) onMQTTMessageReceived(msg);
                });
                // 重连机制
                mqttClient.UseDisconnectedHandler(e =>
               {
                   //Console.WriteLine("与服务器之间的连接断开了，正在尝试重新连接");
                   // 等待 5s 时间
                   Task.Delay(TimeSpan.FromSeconds(5));
                   try
                   {
                       // 重新连接
                       var t = mqttClient.ConnectAsync(options);
                       t.Wait();
                       //订阅主题
                       if (mqttClient.IsConnected)
                       {
                           foreach (var item in topicsNeedSubscribe)
                           {
                               FuncResult<bool> result = new FuncResult<bool>();
                               result = this.ClientSubscribeTopic(item);
                               if (result.bSuccess == false) mqttClient.DisconnectAsync();
                               break;
                           }
                       }
                       //重新连接后做的事情
                       if (this.onReconnected != null) onReconnected();
                   }
                   catch (Exception ex)
                   {

                   }
               });
                // 连接到服务器
                var connectResult = mqttClient.ConnectAsync(options);
                connectResult.Wait();
                result.bSuccess = mqttClient.IsConnected;
                result.strMsg = "";
                //订阅主题
                if (mqttClient.IsConnected)
                {
                    foreach (var item in topicsNeedSubscribe)
                    {
                        FuncResult<bool> result_Subscribe = new FuncResult<bool>();
                        result = this.ClientSubscribeTopic(item);
                        if (result.bSuccess == false) mqttClient.DisconnectAsync();
                    }
                }
            }
            catch (Exception ex)
            {
                result.bSuccess = false;
                result.strMsg = ex.Message;
            }
            return result;
        }
        /// <summary>
        /// 断开连接
        /// </summary>
        /// <returns></returns>
        public FuncResult<bool> Disconnect()
        {
            var result = new FuncResult<bool>();
            try
            {
                if (this.mqttClient == null)
                {
                    result.strMsg = "MQTT客户端未进行初始化";
                    result.bSuccess = false;
                    return result;
                }
                var task = this.mqttClient.DisconnectAsync();
                result.bSuccess = true;
                result.strMsg = "";
            }
            catch (Exception ex)
            {
                result.bSuccess = false;
                result.strMsg = ex.Message;
            }
            return result;
        }
        /// <summary>
        /// 订阅主题
        /// </summary>
        /// <param name="_topic"></param>
        /// <returns></returns>
        public FuncResult<bool> ClientSubscribeTopic(string _topic)
        {
            var result = new FuncResult<bool>();
            try
            {
                if (this.mqttClient == null)
                {
                    result.strMsg = "尚未连接到服务器";
                    result.bSuccess = false;
                    return result;
                }
                if (this.mqttClient.IsConnected == false)
                {
                    result.strMsg = "当前未处于连接状态";
                    result.bSuccess = false;
                    return result;
                }
                _topic = _topic.Trim();
                if (string.IsNullOrEmpty(_topic))
                {
                    result.strMsg = "订阅主题不能为空！";
                    result.bSuccess = false;
                    return result;
                }
                // 判断客户端是否连接
                if (!mqttClient.IsConnected)
                {
                    result.strMsg = "MQTT 客户端尚未连接!";
                    result.bSuccess = false;
                    return result;
                }
                // 设置订阅参数
                var subscribeOptions = new MqttClientSubscribeOptionsBuilder().WithTopicFilter(_topic).Build();
                // 订阅
                var res = mqttClient.SubscribeAsync(subscribeOptions, CancellationToken.None);
                res.Wait();
                if (res.Exception == null)
                {
                    result.bSuccess = true;
                    result.strMsg = "";
                }
                else
                {
                    result.bSuccess = false;
                    result.strMsg = res.Exception.Message;
                }
            }
            catch (Exception ex)
            {
                result.bSuccess = false;
                result.strMsg = ex.Message;
            }
            return result;
        }
        /// <summary>
        /// 取消订阅
        /// </summary>
        /// <returns></returns>
        public FuncResult<bool> ClientUnsubscribeTopic(string strTopic)
        {
            var result = new FuncResult<bool>();
            try
            {
                if (this.mqttClient == null)
                {
                    result.strMsg = "尚未连接到服务器";
                    result.bSuccess = false;
                    return result;
                }
                if (this.mqttClient.IsConnected == false)
                {
                    result.strMsg = "当前未处于连接状态";
                    result.bSuccess = false;
                    return result;
                }
                if (string.IsNullOrEmpty(strTopic))
                {
                    result.strMsg = "退订主题不能为空！";
                    result.bSuccess = false;
                    return result;
                }
                // 判断客户端是否连接
                if (!mqttClient.IsConnected)
                {
                    result.strMsg = "MQTT 客户端尚未连接!";
                    result.bSuccess = false;
                    return result;
                }
                // 设置订阅参数
                var subscribeOptions = new MqttClientUnsubscribeOptionsBuilder().WithTopicFilter(strTopic).Build();
                // 退订
                var res = mqttClient.UnsubscribeAsync(subscribeOptions, CancellationToken.None);
                res.Wait();
                if (res.Exception == null)
                {
                    result.bSuccess = true;
                    result.strMsg = "";
                }
                else
                {
                    result.bSuccess = false;
                    result.strMsg = res.Exception.Message;
                }
            }
            catch (Exception ex)
            {
                result.bSuccess = false;
                result.strMsg = ex.Message;
            }
            return result;

        }
        /// <summary>
        /// 发送消息
        /// </summary>
        /// <param name="message"></param>
        /// <returns></returns>
        public FuncResult<bool> ClientPublish(MQTT_Message message)
        {
            var result = new FuncResult<bool>();
            try
            {
                message.text = message.text.Trim();
                // 判断客户端是否连接
                if (!mqttClient.IsConnected)
                {
                    ConnectToServer();
                }
                // 填充消息
                var applicationMessage = new MqttApplicationMessageBuilder()
                    .WithTopic(message.Topic)       // 主题
                    .WithPayload(message.text)   // 消息
                    .WithExactlyOnceQoS()   // qos
                    .WithRetainFlag()       // retain
                    .Build();
                var res = mqttClient.PublishAsync(applicationMessage);
                res.Wait();
                if (res.Result.ReasonCode == MqttClientPublishReasonCode.Success)
                {
                    result.bSuccess = true;
                    result.strMsg = "";
                }
                else
                {
                    result.bSuccess = false;
                    result.strMsg = res.Result.ReasonString;
                }
            }
            catch (Exception ex)
            {
                result.bSuccess = false;
                result.strMsg = ex.Message;
            }
            return result;
        }
    }
}
